Kotlin协程原理

协程原理

suspend 原理

suspend 方法

示例代码:

suspend fun susFun() {
    delay(100)
    println("hello suspend function.")
}

反编译后:

public static final Object susFun(Continuation<? super kotlin.Unit> r7) {
    /*
        r4 = -2147483648(0xffffffff80000000, float:-0.0)
        boolean r2 = r7 instanceof me.hacket.coroutine.SuspendFunTestKt.susFun.1 // 内部类SuspendFunTestKt$susFun$1
        if (r2 == 0) goto L_0x0026
        r2 = r7
        me.hacket.coroutine.SuspendFunTestKt$susFun$1 r2 = (me.hacket.coroutine.SuspendFunTestKt.susFun.1) r2
        int r3 = r2.label
        r3 = r3 & r4
        if (r3 == 0) goto L_0x0026
        int r3 = r2.label
        int r3 = r3 - r4
        r2.label = r3
    L_0x0013:
        java.lang.Object r1 = r2.result
        java.lang.Object r3 = kotlin.coroutines.intrinsics.IntrinsicsKt.getCOROUTINE_SUSPENDED()
        int r4 = r2.label
        switch(r4) {
            case 0: goto L_0x002d;
            case 1: goto L_0x003d;
            default: goto L_0x001e;
        }
    L_0x001e:
        java.lang.IllegalStateException r2 = new java.lang.IllegalStateException
        java.lang.String r3 = "call to 'resume' before 'invoke' with coroutine"
        r2.<init>(r3)
        throw r2
    L_0x0026:
        me.hacket.coroutine.SuspendFunTestKt$susFun$1 r0 = new me.hacket.coroutine.SuspendFunTestKt$susFun$1
        r0.<init>(r7)
        r2 = r0
        goto L_0x0013
    L_0x002d:
        kotlin.ResultKt.throwOnFailure(r1)
        r4 = 100
        r6 = 1
        r2.label = r6
        java.lang.Object r2 = kotlinx.coroutines.DelayKt.delay(r4, r2)
        if (r2 != r3) goto L_0x0040
        r2 = r3
    L_0x003c:
        return r2
    L_0x003d:
        kotlin.ResultKt.throwOnFailure(r1)
    L_0x0040:
        java.lang.String r2 = "hello suspend function."
        java.io.PrintStream r3 = java.lang.System.out
        r3.println(r2)
        kotlin.Unit r2 = kotlin.Unit.INSTANCE
        goto L_0x003c
        switch-data {0->0x002d, 1->0x003d, }
    */
}
final class SuspendFunTestKt$susFun$1 extends ContinuationImpl {
    int label;
    /* synthetic */ Object result;

    SuspendFunTestKt$susFun$1(Continuation continuation) {
        super(continuation);
    }

    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        this.result = $result;
        this.label |= Integer.MIN_VALUE;
        return SuspendFunTestKt.susFun(this);
    }
}

suspend lambda 是个什么东西?

示例代码:

fun main() {
    val mySuspend1: suspend () -> String = {
        delay(1000)
        "hehe"
    }
    val javaClass = mySuspend1.javaClass
    println("javaClass=${javaClass.simpleName}") // main$mySuspend1$1
    println("javaClass.superclass=${javaClass.superclass.simpleName}") // SuspendLambda
}

我们用 jadx 反编译看看:

public final class SuspendTestKt {
    public static final void main() {
        Class javaClass = new main.mySuspend1.1((Continuation) null).getClass();
        System.out.println((Object) ("javaClass=" + javaClass.getSimpleName()));
        StringBuilder append = new StringBuilder().append("javaClass.superclass=");
        Class<? super Object> superclass = javaClass.getSuperclass();
        Intrinsics.checkExpressionValueIsNotNull(superclass, "javaClass.superclass");
        System.out.println((Object) append.append(superclass.getSimpleName()).toString());
    }
}
// 内部类SuspendTestKt$main$mySuspend1$1
final class SuspendTestKt$main$mySuspend1$1 extends SuspendLambda implements Function1<Continuation<? super String>, Object> {
    int label;

    SuspendTestKt$main$mySuspend1$1(Continuation continuation) {
        super(1, continuation);
    }

    @NotNull
    public final Continuation<Unit> create(@NotNull Continuation<?> continuation) {
        Intrinsics.checkParameterIsNotNull(continuation, "completion");
        return new SuspendTestKt$main$mySuspend1$1(continuation);
    }

    public final Object invoke(Object obj) {
        return create((Continuation) obj).invokeSuspend(Unit.INSTANCE);
    }

    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch (this.label) {
            case 0:
                ResultKt.throwOnFailure($result);
                this.label = 1;
                if (DelayKt.delay(1000, this) == coroutine_suspended) {
                    return coroutine_suspended;
                }
                break;
            case 1:
                ResultKt.throwOnFailure($result);
                break;
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
        return "hehe";
    }
}

由反编译可知,可以看到 suspend () -> String 是一个 SuspendLambda 并实现了 Function1 接口。

协程原理

以下面代码为例,分析协程执行原理:

fun main() {
   val scope = MyContextScope(EmptyCoroutineContext)
    scope.launch(Dispatchers.IO) {
        println("hello world. ${Thread.currentThread().name}")
    }
    Thread.sleep(2000)
}

输出:

hello world. DefaultDispatcher-worker-1

反编译生成代码

反编译生成的代码,launch 的第三个参数 block 会生成一个 SuspendLambda 内部类:

public final class HahaKt {
    public static final void main() {
        BuildersKt.launch$default((CoroutineScope) new MyContextScope(EmptyCoroutineContext.INSTANCE), Dispatchers.getIO(), (CoroutineStart) null, new main.1((Continuation) null), 2, (Object) null);
        Thread.sleep(2000);
    }
}
// launch的第三个参数block会生成内部类HahaKt$main$1
final class HahaKt$main$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
    int label;
    private CoroutineScope p$;

    HahaKt$main$1(Continuation continuation) {
        super(2, continuation);
    }

    @NotNull
    public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<?> continuation) {
        Intrinsics.checkParameterIsNotNull(continuation, "completion");
        HahaKt$main$1 hahaKt$main$1 = new HahaKt$main$1(continuation);
        CoroutineScope coroutineScope = (CoroutineScope) value;
        hahaKt$main$1.p$ = (CoroutineScope) value;
        return hahaKt$main$1;
    }

    public final Object invoke(Object obj, Object obj2) {
        return create(obj, (Continuation) obj2).invokeSuspend(Unit.INSTANCE);
    }

    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch (this.label) {
            case 0:
                ResultKt.throwOnFailure($result);
                CoroutineScope coroutineScope = this.p$;
                StringBuilder append = new StringBuilder().append("hello world. ");
                Thread currentThread = Thread.currentThread();
                Intrinsics.checkExpressionValueIsNotNull(currentThread, "Thread.currentThread()");
                System.out.println((Object) append.append(currentThread.getName()).toString());
                return Unit.INSTANCE;
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
    }
}

协程的创建

CoroutineScope.launch

首先看 launch:

// CoroutineScope
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    // 根据父级创建新的上下文(协程的父级上下文)
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        // 协程真正的上下文生成是以newContext作为父级上下文生成的
        LazyStandaloneCoroutine(newContext, block) else 
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
// 这是一个CoroutineScope的扩展函数,coroutineContext其实就是拿到到了scope对象的成员
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    val combined = coroutineContext + context
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default else debug
}

AbstractCoroutine.start

现在看看 AbstractCoroutine.start:

// AbstractCoroutine.kt
public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {

    init {
        if (initParentJob) initParentJob(parentContext[Job])
    }
    
    // receiver: StandaloneCoroutine
    // block: suspend StandaloneCoroutine.() -> Unit
    public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        // initParentJob()
        start(block, receiver, this) // 等同于start.invoke() ,注意第3个参数completion:this
    }
}

调用了 CoroutineStart.invoke() 方法,看看它的参数:

看看 StandaloneCoroutine:

private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true
    }
}

CoroutineStart.invoke

调用到了 CoroutineStart,CoroutineStart 是一个枚举类,接着看 CoroutineStart#invoke() 方法:

// CoroutineStart
public enum class CoroutineStart {
    DEFAULT, LAZY, ATOMIC, UNDISPATCHED;
    
    public val isLazy: Boolean get() = this === LAZY
    
    // block - suspend StandaloneCoroutine.() -> Unit,为SuspendLambda
    // receiver - StandaloneCoroutine
    // completion - StandaloneCoroutine<Unit>
    public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
    // 根据 start 参数的类型调用不同的方法
    when (this) {
        CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
        CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
        CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
        CoroutineStart.LAZY -> Unit // will start lazily
    }
}

然后调用 startCoroutineCancellable()

(suspend (R) -> T).startCoroutineCancellable 创建 Continuation

这里我们看 CoroutineStart.DEFAULT,然后调用了 block.startCoroutineCancellable(receiver, completion)

// Cancellable.kt
// receiver - StandaloneCoroutine
// completion - StandaloneCoroutine<Unit>
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) { // 抛出异常后,调用Continuation.resumeWith(Result.failure(e))
        createCoroutineUnintercepted(receiver, completion)
            .intercepted()
            .resumeCancellableWith(Result.success(Unit), onCancellation)
    }
private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) {
    try {
        block()
    } catch (e: Throwable) {
        completion.resumeWith(Result.failure(e))
    }
}
(suspend (R) -> T).createCoroutineUnintercepted

(suspend (R) -> T).createCoroutineUnintercepted 创建一个 Continuation
现在看看 createCoroutineUnintercepted()

// Cancellable.kt
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(completion: Continuation<T>):Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(probeCompletion)
    else
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function1<Continuation<T>, Any?>).invoke(it)
        }
}

// https://github.com/JetBrains/kotlin/blob/master/libraries/stdlib/jvm/src/kotlin/coroutines/jvm/internal/DebugProbes.kt
internal fun <T> probeCoroutineCreated(completion: Continuation<T>): Continuation<T> {
    /** implementation of this function is replaced by debugger */
    return completion
}

接着看 create(completion),create 方法创建的 Continuation 是一个 SuspendLambda 对象。
看看 create 反编译后的代码:

@NotNull
 public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
    Intrinsics.checkNotNullParameter(completion, "completion");
    Function2 var3 = new <anonymous constructor>(completion);
    return var3;
 }

接着看 intercepted()

Continuation.intercepted() 返回 DispatchedContinuation

接着回到 startCoroutineCancellable 看 intercepted(),通过 ContinuationInterceptor 拦截当前 Continuation

// Cancellable.kt
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
// 如果是ContinuationImpl类型,则调用intercepted方法,否则返回自身
// 这里的 this 是 Main$main$1 实例 - ContinuationImpl的子类
    (this as? ContinuationImpl)?.intercepted() ?: this

接着看 ContinuationImpl.intercepted()

// ContinuationImpl
internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    private var intercepted: Continuation<Any?>? = null
    public fun intercepted(): Continuation<Any?> =
        // context[ContinuationInterceptor]是 CoroutineDispatcher 实例
        intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
        .also { intercepted = it }
}

直接返回 intercepted;如果 intercepted 为 null,取 CoroutineContext 中的 ContinuationInterceptor,并调用其 interceptContinuation()

CoroutineDispatcher 实现了 ContinuationInterceptor

// CoroutineDispatcher
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)

所以 intercepted() 分情况:

  1. 需要线程调度 - 返回 DispatchedContinuation,其 continuation 参数值为 SuspendLambda
  2. 不需要线程调度 - 返回 SuspendLambda

协程的启动

接下来看看 resumeCancellableWith 是怎么启动协程的,这里还涉及到 Dispatchers 线程调度的逻辑:

DispatchedContinuation

前面 startCoroutineCancellable() 里,如果有线程调度,那么返回的是 DispatchedContinuation;没有的话返回 SuspendLambda

// DispatchedContinuation
internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {

    override val delegate: Continuation<T>
        get() = this
        
    override fun resumeWith(result: Result<T>) {
        val context = continuation.context
        val state = result.toState()
        if (dispatcher.isDispatchNeeded(context)) { // 判断是否需要线程调度
            _state = state
            resumeMode = MODE_ATOMIC
            dispatcher.dispatch(context, this) // 将协程的运算分发到另一个线程
        } else {
            executeUnconfined(state, MODE_ATOMIC) {
                withCoroutineContext(this.context, countOrElement) {
                    continuation.resumeWith(result)
                }
            }
        }
    }

    inline fun resumeCancellableWith(
        result: Result<T>,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {
        val state = result.toState(onCancellation)
        if (dispatcher.isDispatchNeeded(context)) { // 判断是否需要线程调度
            _state = state
            resumeMode = MODE_CANCELLABLE
            dispatcher.dispatch(context, this) // 将协程的运算分发到另一个线程
        } else {
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled(state)) { // 不需要调度则直接在当前线程执行协程
                    resumeUndispatchedWith(result)
                }
            }
        }
    }
    inline fun resumeUndispatchedWith(result: Result<T>) {
        withContinuationContext(continuation, countOrElement) {
            continuation.resumeWith(result)
        }
    }   
}


public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
    // 进行线程调度,最后也会执行到continuation.resumeWith方法
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation) 
    // 直接执行continuation.resumeWith方法
    else -> resumeWith(result)
}
  1. 当需要线程调度时,则在调度后会调用 DispatchedContinuation.continuation.resumeWith 来启动协程,其中 continuation 是 SuspendLambda 实例
  2. 当不需要线程调度时,则直接调用 SuspendLambda.resumeWith 来启动协程

DispatchedContinuation 继承自 DispatchedTask,又继承自 Task,最终实现了 Runnable,那我们看下其 run 方法:

internal abstract class DispatchedTask<in T>(
    @JvmField public var resumeMode: Int
) : SchedulerTask() {
    public final override fun run() {
        assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
        val taskContext = this.taskContext
        var fatalException: Throwable? = null
        try {
            val delegate = delegate as DispatchedContinuation<T>
            val continuation = delegate.continuation
            withContinuationContext(continuation, delegate.countOrElement) {
                val context = continuation.context
                val state = takeState() // NOTE: Must take state in any case, even if cancelled
                val exception = getExceptionalResult(state)
                /*
                 * Check whether continuation was originally resumed with an exception.
                 * If so, it dominates cancellation, otherwise the original exception
                 * will be silently lost.
                 */
                val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
                if (job != null && !job.isActive) {
                    val cause = job.getCancellationException()
                    cancelCompletedResult(state, cause)
                    continuation.resumeWithStackTrace(cause)
                } else {
                    if (exception != null) {
                        continuation.resumeWithException(exception)
                    } else {
                        continuation.resume(getSuccessfulResult(state))
                    }
                }
            }
        } catch (e: Throwable) {
            // This instead of runCatching to have nicer stacktrace and debug experience
            fatalException = e
        } finally {
            val result = runCatching { taskContext.afterTask() }
            handleFatalException(fatalException, result.exceptionOrNull())
        }
    }
}

下面看看 SuspendLambda 的类关系:
SuspendLambda→ContinuationImpl→BaseContinuationImpl→Continuation

resumeWith 方法调用的是父类 BaseContinuationImpl 中的 resumeWith 方法:

internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>?) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result: Result<Any?>) {
        // ...
        val outcome = invokeSuspend(param)
        // ...
    }
}

SuspendLambda→ContinuationImpl→BaseContinuationImpl→Continuation

SuspendLambda

由前面可知 suspend () -> T 是一个 SuspendLambda,现在看看 SuspendLambda

// Suspension lambdas inherit from this class
internal abstract class SuspendLambda(
    public override val arity: Int,
    completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
    constructor(arity: Int) : this(arity, null)

    public override fun toString(): String =
        if (completion == null)
            Reflection.renderLambdaToString(this) // this is lambda
        else
            super.toString() // this is continuation
}
ContinuationImpl

SuspendLambda 继承 ContinuationImpl,接着看看 ContinuationImpl

internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)

    public override val context: CoroutineContext
        get() = _context!!

    @Transient
    private var intercepted: Continuation<Any?>? = null

    public fun intercepted(): Continuation<Any?> = // 拦截Continuation
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }

    protected override fun releaseIntercepted() {
        val intercepted = intercepted
        if (intercepted != null && intercepted !== this) {
            context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
        }
        this.intercepted = CompletedContinuation // just in case
    }
}
BaseContinuationImpl

ContinuationImpl 又继承 BaseContinuationImpl,接着看 BaseContinuationImpl:

internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>?) : 
    // 这个completion就是AbstractCoroutine
Continuation<Any?>, CoroutineStackFrame, Serializable {
    
    public final override fun resumeWith(result: Result<Any?>) {
        
        var current = this
        var param = result
        while (true) { // 死循环
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // SuspendLambda是BaseContinuationImpl
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else { // AbstractCoroutine不是BaseContinuationImpl
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

    protected abstract fun invokeSuspend(result: Result<Any?>): Any?

    protected open fun releaseIntercepted() {
        // does nothing here, overridden in ContinuationImpl
    }

    // 子类实现,返回一个Continuation
    public open fun create(completion: Continuation<*>): Continuation<Unit> {
        throw UnsupportedOperationException("create(Continuation) has not been overridden")
    }
    // 子类实现,返回一个Continuation
    public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
        throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
    }

    public override fun toString(): String =
        "Continuation at ${getStackTraceElement() ?: this::class.java.name}"

    // --- CoroutineStackFrame implementation

    public override val callerFrame: CoroutineStackFrame?
        get() = completion as? CoroutineStackFrame

    public override fun getStackTraceElement(): StackTraceElement? =
        getStackTraceElementImpl()
}

最后协程是调用了 AbstractCoroutine 的 resumeWith

// AbstractCoroutine
public final override fun resumeWith(result: Result<T>) {
    val state = makeCompletingOnce(result.toState())
    if (state === COMPLETING_WAITING_CHILDREN) return
    afterResume(state)
}

协程启动小结

  1. 协程的启动是通过 BaseContinuationImpl.resumeWith 方法调用到了子类 SuspendLambda.invokeSuspend 方法,然后通过状态机来控制顺序运行
  2. 在 BaseContinuationImpl.resumeWith 有个死循环,调用 invokeSuspend 来执行具体的协程代码,碰到 COROUTINE_SUSPENDED 时,
  3. Kotlin 中的协程存在着三层包装
第一层包装: launch & async 返回的 Job, Deferred 继承自 AbstractCoroutine, 里面封装了协程的状态,提供了 cancel 等接口;
第二层包装: 编译器生成的 SuspendLambda 子类,封装了协程的真正执行逻辑,其继承关系为 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl, 它的 completion 参数就是第一层包装实例;
第三层包装: DispatchedContinuation, 封装了线程调度逻辑,它的 continuation 参数就是第二层包装实例。

hufmb

协程状态机

以下面的代码为例解析一下协程启动的状态机流程:

private suspend fun getId(): String {
    return GlobalScope.async(Dispatchers.IO) {
        delay(1000)
        "hearing"
    }.await()
}

private suspend fun getAvatar(id: String): String {
    return GlobalScope.async(Dispatchers.IO) {
        delay(1000)
        "avatar-$id"
    }.await()
}

fun main() {
    GlobalScope.launch {
        val id = getId()
        val avatar = getAvatar(id)
        println("${Thread.currentThread().name} - $id - $avatar")
    }
}

上面 main 方法中,GlobalScope.launch 启动的协程体在执行到 getId 后,协程体会挂起,直到 getId 返回可用结果,才会 resume launch 协程,执行到 getAvatar 也是同样的过程。

协程内部实现使用状态机来处理不同的挂起点,将 GlobalScope.launch 协程体字节码反编译成 Java 代码,大致如下 (有所删减):

private static final Object getId(Continuation $completion) {
  return BuildersKt.async$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getIO(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
     int label;
     @Nullable
     public final Object invokeSuspend(@NotNull Object $result) {
        Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch(this.label) {
        case 0:
           ResultKt.throwOnFailure($result);
           this.label = 1;
           if (DelayKt.delay(1000L, this) == var2) {
              return var2;
           }
           break;
        case 1:
           ResultKt.throwOnFailure($result);
           break;
        default:
           throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
        return "hearing";
     }

     @NotNull
     public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
        Intrinsics.checkNotNullParameter(completion, "completion");
        Function2 var3 = new <anonymous constructor>(completion);
        return var3;
     }

     public final Object invoke(Object var1, Object var2) {
        return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
     }
  }), 2, (Object)null).await($completion);
}

private static final Object getAvatar(final String id, Continuation $completion) {
  return BuildersKt.async$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getIO(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
     int label;
     @Nullable
     public final Object invokeSuspend(@NotNull Object $result) {
        Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch(this.label) {
        case 0:
           ResultKt.throwOnFailure($result);
           this.label = 1;
           if (DelayKt.delay(1000L, this) == var2) {
              return var2;
           }
           break;
        case 1:
           ResultKt.throwOnFailure($result);
           break;
        default:
           throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
        return "avatar-" + id;
     }

     @NotNull
     public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
        Intrinsics.checkNotNullParameter(completion, "completion");
        Function2 var3 = new <anonymous constructor>(completion);
        return var3;
     }

     public final Object invoke(Object var1, Object var2) {
        return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
     }
  }), 2, (Object)null).await($completion);
}

public static final void main() {
    BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null,
        (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
        int label;
    
        public final Object invokeSuspend(@NotNull Object $result) {
            Object var10000;
            String id;
            label17: {
                CoroutineScope $this$launch;
                switch(this.label) {
                case 0: // a
                    ResultKt.throwOnFailure($result);
                    $this$launch = this.p$;
                    this.label = 1; // label置为1
                    var10000 = getId(this);
                    if (var10000 == COROUTINE_SUSPENDED) {
                        return COROUTINE_SUSPENDED;
                    }
                    // 若此时已经有结果,则不挂起,直接break
                    break;
                case 1: // b
                    ResultKt.throwOnFailure($result);
                    var10000 = $result;
                    break;
                case 2: // d
                    id = (String)this.L$1;
                    ResultKt.throwOnFailure($result);
                    var10000 = $result;
                    break label17; // 退出label17
                default:
                    throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                }
                // c
                id = (String)var10000;
                this.L$1 = id; // 将id赋给L$1
                this.label = 2; // label置为2
                var10000 = getAvatar(id, this);
                if (var10000 == COROUTINE_SUSPENDED) {
                    return COROUTINE_SUSPENDED;
                }
            }
            // e
            String avatar = (String)var10000;
            String var5 = var9.append(var10001.getName()).append(" - ").append(id).append(" - ").append(avatar).toString();
            System.out.println(var5);
            return Unit.INSTANCE;
        }
    
        @NotNull
        public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkParameterIsNotNull(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            var3.p$ = (CoroutineScope)value;
            return var3;
        }
    
        public final Object invoke(Object var1, Object var2) {
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
        }
    }
}

invokeSuspend 方法会在协程体中的 suspend 函数得到结果后被调用;具体调用在 BaseContinuationImpl.resumeWith 调用

执行流程:

  1. a: launch 协程体刚执行到 getId 方法时,getId 方法的返回值将是 COROUTINE_SUSPENDED, 此时直接 return, 则 launch 协程体中 getId 后面的代码暂时不会执行,即 launch 协程体被挂起 (非阻塞, 该线程依旧会做其它工作)。这里将 label 置为了 1. 而若此时 getId 已经有结果 (内部没有调用 delay 之类的 suspend 函数等),则不挂起,而是直接 break。
  2. b: 若上面 a 中 getId 返回 COROUTINE_SUSPENDED, 则当 getId 有可用结果返回后,会重新执行 launch 协程体的 invokeSuspend 方法,根据上面的 label==1, 会执行到这里检查一下 result 没问题的话就 break, 此时 id 赋值给了 var10000。
  3. c: 在 a 中若直接 break 或 在 b 中得到 getId 的结果然后 break 后,都会执行到这里,得到 id 的值并把 label 置为 2。然后调用 getAvatar 方法,跟 getId 类似,若其返回 COROUTINE_SUSPENDED 则 return,协程被挂起,等到下次 invokeSuspend 被执行,否则离开 label17 接着执行后续逻辑。
  4. d: 若上面 c 中 getAvatar 返回 COROUTINE_SUSPENDED, 则当 getAvatar 有可用结果返回后会重新调用 launch 协程体的 invokeSuspend 方法,此时根据 label==2 来到这里并取得之前的 id 值,检验 result(即 avatar),然后 break label17。
  5. e: c 中直接返回了可用结果 或 d 中 break label17 后,launch 协程体中的 suspend 函数都执行完毕了,这里会执行剩下的逻辑。

协程的挂起和恢复

Kotlin 编译器会为 协程体 生成继承自 SuspendLambda 的子类,协程的真正运算逻辑都在其 invokeSuspend 方法中。

Kotlin 协程的内部实现使用了 Kotlin 编译器的一些编译技术,当 suspend 函数被调用时,都有一个隐式的参数额外传入,这个参数是 Continuation 类型,封装了协程 resume 后执行的代码逻辑。

private suspend fun getId(): String {
    return GlobalScope.async(Dispatchers.IO) {
        delay(1000)
        "hearing"
    }.await()
}

// Decompile成Java
final Object getId(@NotNull Continuation $completion) {
    // ...
}

其中传入的 $completion 参数,可以看到是调用 getId 方法所在的协程体对象,也就是一个 SuspendLambda 对象。Continuation 的定义如下:

public interface Continuation<in T> {
    public val context: CoroutineContext

    public fun resumeWith(result: Result<T>)
}

将 getId 方法编译后的字节码反编译成 Java 代码如下:

final Object getId(@NotNull Continuation $completion) {
    // 新建与启动协程
    return BuildersKt.async$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getIO(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
        int label;

        @Nullable
        public final Object invokeSuspend(@NotNull Object $result) {
            switch(this.label) {
            case 0:
                ResultKt.throwOnFailure($result);
                this.label = 1;
                if (DelayKt.delay(1000L, this) == COROUTINE_SUSPENDED) {
                    return COROUTINE_SUSPENDED;
                }
                break;
            case 1:
                ResultKt.throwOnFailure($result);
                break;
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
            return "hearing";
        }

        // ...
    }), 2, (Object)null).await($completion); // 调用 await() suspend 函数
}
  1. 在 getId,delay 未返回值时,返回 COROUTINE_SUSPENDED,即代表还没有值,此时协程挂起,但不阻塞线程;
  2. 当 suspend 函数有返回值时,会继续调用 invokeSuspend,恢复协程运行

父子协程

launch:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

// AbstractCoroutine
init {
    if (initParentJob) initParentJob(parentContext[Job])
}
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
    start(block, receiver, this)
}

接着看 initParentJob():

// JobSupport
protected fun initParentJob(parent: Job?) { // parent就是父协程的CoroutineContext协程上下文
    assert { parentHandle == null }
    if (parent == null) {
        parentHandle = NonDisposableHandle
        return
    }
    parent.start() // make sure the parent is started
    val handle = parent.attachChild(this)
    parentHandle = handle
    // now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
    if (isCompleted) {
        handle.dispose()
        parentHandle = NonDisposableHandle // release it just in case, to aid GC
    }
}

接下来重点在于 parent.attachChild 方法:

public final override fun attachChild(child: ChildJob): ChildHandle {
    return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle
}

invokeOnCompletion 方法主要是将 handler 节点添加到父协程的一个队列 (state.list) 中

GlobalScope.launch 没有父协程

协程完成

协程的完成通过 AbstractCoroutine.resumeWith 实现

// AbstractCoroutine
public final override fun resumeWith(result: Result<T>) {
    val state = makeCompletingOnce(result.toState())
    if (state === COMPLETING_WAITING_CHILDREN) return
    afterResume(state)
}

调用路径:makeCompletingOnce -> tryMakeCompleting -> tryMakeCompletingSlowPath -> tryWaitForChild:

private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean {
    val handle = child.childJob.invokeOnCompletion(
        invokeImmediately = false,
        handler = ChildCompletion(this, state, child, proposedUpdate).asHandler
    )
    if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
    val nextChild = child.nextChild() ?: return false
    return tryWaitForChild(state, nextChild, proposedUpdate)
}

可知 tryWaitForChild 方法将 ChildCompletion 节点添加到了子协程的 state.list 队列中,当子协程完成或者取消时调用 ChildCompletion.invoke:

// ChildCompletion
override fun invoke(cause: Throwable?) {
    parent.continueCompleting(state, child, proposedUpdate)
}

private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) {
    assert { this.state === state } // consistency check -- it cannot change while we are waiting for children
    // figure out if we need to wait for next child
    val waitChild = lastChild.nextChild()
    // try wait for next child
    if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
    // no more children to wait -- try update state
    val finalState = finalizeFinishingState(state, proposedUpdate)
    afterCompletion(finalState)
}

父协程需要等待所有子协程处于完成或者取消状态才能完成自身。

协程取消

// JobSupport.kt
// external cancel with cause, never invoked implicitly from internal machinery
public override fun cancel(cause: CancellationException?) {
    cancelInternal(cause ?: defaultCancellationException())
}
public open fun cancelInternal(cause: Throwable) {
    cancelImpl(cause)
}
internal fun cancelImpl(cause: Any?): Boolean {
    var finalState: Any? = COMPLETING_ALREADY
    if (onCancelComplete) {
        // make sure it is completing, if cancelMakeCompleting returns state it means it had make it
        // completing and had recorded exception
        finalState = cancelMakeCompleting(cause)
        if (finalState === COMPLETING_WAITING_CHILDREN) return true
    }
    if (finalState === COMPLETING_ALREADY) {
            finalState = makeCancelling(cause)
    }
    return when {
        finalState === COMPLETING_ALREADY -> true
        finalState === COMPLETING_WAITING_CHILDREN -> true
        finalState === TOO_LATE_TO_CANCEL -> false
        else -> {
            afterCompletion(finalState)
            true
        }
    }
}

makeCancelling() 调用了 notifyCancelling()

// JobSupport.kt
// list是一个协程启动时,initParentJob()将自己添加到了父Job的list,封装成了ChildHandleNode添加到父Job的list
private fun notifyCancelling(list: NodeList, cause: Throwable) {
    // first cancel our own children
    onCancelling(cause)
    // 会循环执行上面添加的 ChildHandleNode 的 invoke 方法,即循环取消子协程
    notifyHandlers<JobCancellingNode>(list, cause)
    // then cancel parent // 可能取消父协程
    cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
}

下面看看父 parent 如何取消 child,notifyHandlers<JobCancellingNode>(list, cause)

internal abstract class JobCancellingNode : JobNode()
internal class ChildHandleNode(
    @JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {
    override val parent: Job get() = job
    override fun invoke(cause: Throwable?) = childJob.parentCancelled(job) // parent取消child
    override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause) // job取消parent
}
// 子协程通过该方法取消自己
public final override fun parentCancelled(parentJob: ParentJob) {
    cancelImpl(parentJob)
}
public open fun childCancelled(cause: Throwable): Boolean {
    if (cause is CancellationException) return true
    return cancelImpl(cause) && handlesException
}

下面看看 child 如何取消 parent,cancelParent():

private fun cancelParent(cause: Throwable): Boolean {
    // isScopedCoroutine 为 true 则不传播且不取消父协程直接返回,默认为false,子类可以重写
    // Is scoped coroutine -- don't propagate, will be rethrown
    if (isScopedCoroutine) return true

    /* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
     * This allow parent to cancel its children (normally) without being cancelled itself, unless
     * child crashes and produce some other exception during its completion.
     */
    val isCancellation = cause is CancellationException
    val parent = parentHandle
    // No parent -- ignore CE, report other exceptions.
    if (parent === null || parent === NonDisposableHandle) {
        return isCancellation
    }

    // Notify parent but don't forget to check cancellation
    return parent.childCancelled(cause) || isCancellation
}
private class SupervisorCoroutine<in T>(
    context: CoroutineContext,
    uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
    // supervisorScope 启动的协程调用 cancel 和传递异常时,只能由父协程向子协程传播,
    // 不会取消父协程
    override fun childCancelled(cause: Throwable): Boolean = false
}
  1. 协程调用 cancel 时会取消它的所有子协程,默认不会取消它的父协程
  2. 协程的取消只是在第一层包装 AbstractCoroutine 中修改协程的状态,不会影响到第二层包装 BaseContinuationImpl 中的执行逻辑,即协程的取消只是修改状态,不会取消协程的实际执行逻辑

协程异常处理

异常处理入口:BaseContinuationImpl.resumeWith:

class BaseContinuationImpl {
    fun resumeWith(result: Result<Any?>) {
        // ...
        val outcome: Result<Any?> =
        try {
            val outcome = invokeSuspend(param)
            if (outcome === COROUTINE_SUSPENDED) return
            Result.success(outcome)
        } catch (exception: Throwable) {
            Result.failure(exception) // 子协程抛出异常时,在这里捕获并作为结果给 outcome
        }
        if (completion is BaseContinuationImpl) {
            // unrolling recursion via loop
            current = completion
            param = outcome
        } else {
            // top-level completion reached -- invoke and return
            completion.resumeWith(outcome)
            return
        }
    }
}

在捕获了异常后,调用 AbstractCoroutine.resumeWith 来处理,其流程为:
AbstractCoroutine.resumeWith -> JobSupport.makeCompletingOnce -> JobSupport.tryMakeCompleting -> JobSupport.tryMakeCompletingSlowPath

private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
    val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
    val finishing = state as? Finishing ?: Finishing(list, false, null)
    var notifyRootCause: Throwable? = null
    synchronized(finishing) {
        if (finishing.isCompleting) return COMPLETING_ALREADY
        finishing.isCompleting = true
        if (finishing !== state) {
            if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY
        }
        val wasCancelling = finishing.isCancelling
        (proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) }
        // If it just becomes cancelling --> must process cancelling notifications
        notifyRootCause = finishing.rootCause.takeIf { !wasCancelling }
    }
    // process cancelling notification here -- it cancels all the children _before_ we start to to wait them (sic!!!)
    notifyRootCause?.let { notifyCancelling(list, it) }
    val child = firstChild(state) // now wait for children
    if (child != null && tryWaitForChild(finishing, child, proposedUpdate)) return COMPLETING_WAITING_CHILDREN
    // otherwise -- we have not children left (all were already cancelled?)
    return finalizeFinishingState(finishing, proposedUpdate)
}
  1. 当协程发生异常时会取消它的所有子协程,默认会取消它的父协程

接下来看看 finalizeFinishingState 方法:

// JobSupport.kt
private fun finalizeFinishingState(state: Finishing, proposedUpdate: Any?): Any? {
    // ...
    if (finalException != null) {
        val handled = cancelParent(finalException) || handleJobException(finalException)
        if (handled) (finalState as CompletedExceptionally).makeHandled()
    }
    // ...
}
  1. cancelParent 如果是 CancellationException 会返回 true,抛出 CancellationException 父协程可以不取消自己,忽略掉
  2. 如果协程抛出未捕获的非取消异常,则会一步步取消上层的协程,最后根协程调用 handleJobException 处理异常
// JobSupport.kt
// 处理未被parent coroutine处理的异常;返回true表示处理掉
protected open fun handleJobException(exception: Throwable): Boolean = false

实现类有 StandaloneCoroutineActorCoroutine

private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true
    }
}

调用 handleCoroutineException 来处理异常:

// CoroutineExceptionHandler.kt
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
    // Invoke an exception handler from the context if present
    try {
        context[CoroutineExceptionHandler]?.let { // 定义了 CoroutineExceptionHandler 则由它处理
            it.handleException(context, exception)
            return
        }
    } catch (t: Throwable) {
        handleCoroutineExceptionImpl(context, handlerException(exception, t))
        return
    }
    // If a handler is not present in the context or an exception was thrown, fallback to the global handler
    handleCoroutineExceptionImpl(context, exception)
}

// CoroutineExceptionHandlerImpl.kt
// 根据 ServiceLoader, 在 Android 平台中还有 AndroidExceptionPreHandler 处理异常
private val handlers: List<CoroutineExceptionHandler> = ServiceLoader.load(
    CoroutineExceptionHandler::class.java,
    CoroutineExceptionHandler::class.java.classLoader
).iterator().asSequence().toList()
internal actual fun handleCoroutineExceptionImpl(context: CoroutineContext, exception: Throwable) {
    // use additional extension handlers
    for (handler in handlers) {
        try {
            handler.handleException(context, exception)
        } catch (t: Throwable) {
            // Use thread's handler if custom handler failed to handle exception
            val currentThread = Thread.currentThread()
            currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, handlerException(exception, t))
        }
    }

    // use thread's handler
    val currentThread = Thread.currentThread()
    currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
}
  1. 不要抛出异常,这是最后一道能处理异常的地方
  2. 从 CoroutineContext 取出 CoroutineExceptionHandler 来处理异常
  3. 如果没有 CoroutineExceptionHandler 或者在 CoroutineExceptionHandler 又抛出一个异常 handleCoroutineExceptionImpl 处理
  4. 没有处理的异常交给 UncaughtExceptionHandler 来处理
  5. AndroidExceptionPreHandler 是对 CoroutineExceptionHandler 实现的 spi

结构化并发 (Structured Concurrency) 原理

疑问

协程何时需要线程切换?context[ContinuationInterceptor] 什么时候有值

在 CoroutineContext 定义了线程需要切换;

newCoroutineContext,默认会添加 Dispatchers.Default,这个时候 context[ContinuationInterceptor] 就会有值

协程如何切线程?

Continuation.intercepted(),ContinuationInterceptor 拦截 Continuation,而 CoroutineDispatcher 实现了 ContinuationInterceptor,所以协程的切换是以拦截器的方式实现的。

协程如何处理异常?

入口:
在 BaseContinuationImpl.resumeWith,Result.failure(exception)

CancellationException 异常,会被忽略掉,不会取消父协程,只会取消其下所有子协程

private fun cancelParent(cause: Throwable): Boolean {
// Is scoped coroutine -- don't propagate, will be rethrown
if (isScopedCoroutine) return true

/* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
 * This allow parent to cancel its children (normally) without being cancelled itself, unless
 * child crashes and produce some other exception during its completion.
 */
val isCancellation = cause is CancellationException
val parent = parentHandle
// No parent -- ignore CE, report other exceptions.
if (parent === null || parent === NonDisposableHandle) {
    return isCancellation
}

// Notify parent but don't forget to check cancellation
return parent.childCancelled(cause) || isCancellation

}